-
Notifications
You must be signed in to change notification settings - Fork 5.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Weighted Split Scheduling #16668
Weighted Split Scheduling #16668
Conversation
a9143e9
to
ab95f14
Compare
@pettyjamesm hi, I glanced over the PR, it seems that you want to add the weight according to the file size(hive), and schedule it by split weight instead of split count. I guess it may resolve the skew problem, does we have any test report? In addition, is it also useful to the divisible format e.g. parquet ? : ) |
That's correct, all scheduling decisions now operate based on split weights (except for
I'll add the details of the experiment setup I used to the PR description shortly, but I tested this with JSON and Parquet small files and saw a ~2.5x improvement for parquet and ~4x improvement for JSON on the entire TPCH suite for small files with these changes and no change for "normal" size files (as expected). The parquet performance is less dramatic than the JSON scenario because the individual small reads against S3 for each data page are much less efficient than a single streaming read like you get with JSON. In either case, this change dramatically increases worker utilization whereas before the workers would complete work more quickly than the coordinator could deliver splits, which is the problem the scheduler side change attempts to address. If you couple it with smarter parquet I/O strategies, then the parquet small file performance can increase even more dramatically (in one specific scenario tested, pre-reading parquet files < 1MB in size into memory improved by ~8x). |
eb2efad
to
e1bc2b3
Compare
@pettyjamesm What was the cluster size the benchmark was ran on? And how many files were there? |
It's a great test result! And we also do some work when it comes to small files: we don't change the size of |
@yingsu00 The cluster used r5.8xlarge instances with 5 worker nodes and one coordinator. I don't have the datasets anymore to confirm the exact total file count in the 6 TPCH tables, but metrics I still have laying around for individual queries corroborate that the file counts matched the TPCH SF10 schema row count. The small file datasets limited each file to 3,000 rows and the "normal" datasets limited each file to 10M rows, meaning for example- the |
7017ad7
to
6122d94
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we resolve the conflict and update the PR to the newest (From trinodb/trino#9059)
@@ -587,7 +591,28 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon | |||
ENABLE_LOOSE_MEMORY_BASED_ACCOUNTING, | |||
"Enable loose memory accounting to avoid OOMing existing queries", | |||
hiveClientConfig.isLooseMemoryAccountingEnabled(), | |||
false)); | |||
false), | |||
booleanProperty( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this feature can be controlled per query level, the node split stats (getTotalSplitsWeight / getQueuedSplitsWeightForStage) would actually not be accurate
For example, the feature is turned off by default, and only one query has this feature turned on. For this query, the split stats from previous queries (which are still running) are actually using different split weight calculation standards than this query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an environment with some queries enabling this feature and other queries not enabling it:
- It's true that different weight calculations would be used for the splits associated with the queries (disabled: all splits are the "standard" weight, enabled: each splits weight is calculated based on size in bytes)
- I wouldn't say that this means that the node split stats are not accurate, at all times the values are computed as the sum of the weight of the splits assigned assigned- it's just that the weights have been computed differently between queries. It's worth noting that the
getQueuedSplitsWeightForStage()
is a per-task metric, so there will be no interaction between queries on that metric, but the node levelgetTotalSplitsWeight()
will have an interaction between the two queries. In a sense, this is unavoidable so long as this setting exists- but I don't think it's actually a problem since the current implementation when enabled will only weight splits as "smaller than standard" and never "larger than standard". The net effect is that queries with small files will still be allowed a higher level of parallelism than before, even when other queries with small files have this feature disabled.
Yeah, I'm hoping to drive the conversations in that PR to a conclusion before re-syncing the two PR's since the effort to make revisions to both open PR as feedback comes in is high, and it sounds like there will be at least one more round of changes to come. |
Hey, @yulongfufu it sounds interesting, can you share with us your PR as well? |
I pick a demo #16850 from my branch. |
3c5c13e
to
d3da02d
Compare
9596ba9
to
8e081dc
Compare
presto-hive/src/main/java/com/facebook/presto/hive/util/SizeBasedSplitWeightProvider.java
Outdated
Show resolved
Hide resolved
presto-hive/src/main/java/com/facebook/presto/hive/util/SizeBasedSplitWeightProvider.java
Show resolved
Hide resolved
@@ -25,6 +26,7 @@ | |||
public final class SplitWeight | |||
{ | |||
private static final long UNIT_VALUE = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't think using long is a good idea.
First of all, by using long and UNIT_VALUE 100, it loses precision and finer granularity since it can only represent 100 granules.
Secondly, this would regress performance instead of improving it as you originally thought. Since you used long in SPI and double in connector, you had to convert the value between the two:
public static SplitWeight fromProportion(double weight)
{
return fromRawValue((long) Math.ceil(weight * UNIT_VALUE));
}
The use of Math.ceil() is in fact much more expensive than ADD instructions on double precision floating numbers. An ADD on double takes 4 cycles on Intel CPU (Nehalem) while ceil() or floor() takes ten's or hundred's of cycles: https://stackoverflow.com/questions/23203710/floating-point-operations-per-cycle-intel . You also had to do a multiplication weight * UNIT_VALUE
instead of a simple assignment, and multiplication is more expensive than ADD on double or integer types on most CPUs. All these don't need to happen if you just use double in both systems, and you can simply pass the value from connector to scheduler, and the only arithmetic operation you needed to do is mostly ADD. It should be much faster than this conversion. Furthermore, the double arithmetic is the least thing you need to worry about in this scenario. Listing directory is many orders of magnitude higher than adding some, even millions of doubles. In the past I never saw the split assignment related operations would be CPU bottlenecks. You are welcome to write a JMH benchmark to test this.
Thirdly, it added extra logic and complexity to the code for maintaining two weight systems. Code becomes more complex, and developers get confused.
Based on the above, I think having two weight types is a bad idea. I didn't know why Trino side suggested this. Is there something I didn't know? If not any particular reasons, I would strongly suggest unify the two weighting systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to walk through the thinking about how we ended up with this version of SplitWeight
representations in the original PR and then through the changes made in the course of the Trino PR revisions.
First, I want to highlight the different places and characteristics of where these weight values are used:
- The SPI: Each split must be able to assign a weight value, and connectors not supporting split weighting must have a default "standard" weight. Ideally, the "standard" weight and general weight calculation system should be intuitive and avoid creating compatibility headaches in the future.
- The NodeScheduler and supporting classes like
NodeTaskMap
,NodeAssignmentStats
, and split placement logic. This logic does a few things that start to create some real constraints on possible solutions:- The coordinators view of a current task must track the combined weight of splits that are queued locally, queued remotely, and running splits separately performs arithmetic on those separate weight sums to combine the queued locally weight with the weights from the last observed remote task status update.
- The task level accounting then contributes its computed view of the current task via and deltas from previous values into the
NodeTaskMap
, which tracks the current total weight running on a node through these incrementally computed deltas for all tasks on a given node, and does so over the course of that nodes entire lifetime. - Failing to accurately account weights is a risk, and introduces the requirement that split weight arithmetic must not lose accuracy in the course of performic arithmetic over arbitrarily ordered additions and subtractions of combined sums and deltas between them, otherwise a node's state would drift away from the "correct" value and cause bad scheduling decisions in either direction (ie: node is "full" when it's actually not running anything, or "empty" when it's actually fully loaded).
- Split scheduling is latency sensitive, especially when splits are small and the task / node state tracking updates all occur within critical sections or compareAndSet operations. Overhead of calculation should be minimized where possible. For what it's worth, I think
double
would be fine here, butBigDecimal
would be a performance risk. - The scheduler only needs to understand the concept of a "standard" weight in order to translate the previous configuration properties that expressed "split counts" into equivalent "standard weight sum" values
- Task and pipeline status updates inside of split assignments must also accumulate and perform arithmetic operations to determine the worker's view of currently queued / running / completed weights which is similar to the above coordinator side problem. It's also similar in that these accounting points happen in critical paths that are expected to be somewhat contended. Workers need not understand anything about the definition of "standard weight". The final bit of complexity that the task status updates provide however, is that however the state is computed and represented- it must be able to represent the current computed state value back to the coordinator in JSON format. This causes more problems because floating point values must lose at least some precision when serialized to strings (ie: 0.33333...) and might need a large number of digits to lost only an acceptable amount of precision.
With the above constraints, I chose to use a single long
field in the first iteration of this PR with a "standard" value of 100. The choice solved problems associated with items 2 and 3 above, because arbitrarily ordered arithmetic operations over single and aggregated weights is not a problem with integer arithmetic, calculations are cheap to perform, and values serialize correctly and succinctly in JSON. double
would probably perform sufficiently well, but risked precision loss beyond just that introduced by serialization. BigDecimal
would probably have been able to reach a reasonable amount of precision such that even with JSON serialization the loss was negligible, but would have been a performance risk for the inner loop of the split scheduling and critical sections of the task and node status updates performing arbitrary precision arithmetic continuously. This unfortunately, forced me to compromised on item 1 initially.
In the Trino PR the initial pushback on this attempt was similar:
- Computing weights "relative to 100" is awkward and unintuitive- computing them relative to 1.0 is more natural
- Exposing a magic hard-coded constant of 100 as a "standard" value would be cemented into the SPI contract after that point and hard to change
- What if 100 "granules" isn't enough and we want to be able to express weights at higher resolution (eg: 1,000 or 1M)
The resolution was what you see updated in the latest version of this PR:
- The internal representation remains a
long
to avoid loss of accuracy in arithmetic and serialization - The preferred construction of weight values for connectors is via
SplitWeight.fromProportion(double)
which is more intuitive, but allows changes in the granularity in the future if necessary. The performance of the one-time conversion fromdouble
tolong
at the point of creation in connectors is less critical than the performance of sections doing arithmetic on the internal representation within the engine.
That kind of leaves the last remaining item here: is 100 enough granularity? In my experience with this setting, yes. Remember, the weighting mechanism is designed to make the coordinator split assignment rate adaptive to avoid bottlenecking on split scheduling latency which leavees workers underutilized. Once workers are given sufficiently many splits to stay busy, the goal is accomplished and expressing and no further granularity is especially important. This is incidentally part of the reason which is why the default "hive.minimum-assigned-split-weight" is 0.05 (1/20 of a standard split) or 20x more splits allowed, and not 100x more splits. Different environments might have different needs, so if in the future we decide that 1,000 or 1M should become the definition of "standard", that's still an option with the solution in this current PR.
8ffdeab
to
a5d716f
Compare
@pettyjamesm Thanks for the explanation. I see existing split weights you added in May 2020 was using long and changing to another type may be over-killing, and comparing double and long may lose precision. So I'm fine with this approach now. One more catch: the "split" semantics now changed to "standard split" in the related configurations like Also would you please merge the last 3 commits to the previous ones? |
Agreed, the names are no longer appropriately named- I'll change the config property names and add the old names as
Sure thing, my plan was to keep adding incremental commits to make it easier to track the incremental PR changes through review but to collapse the commits down once the changes were more or less ready to merge. My plan for that was to have two commits, one for |
@pettyjamesm thank you! I've been hesitant in whether we should be renaming these properties, or just adding a description with existing names. If we change the property names, existing deployed clusters might fail because of these changes. @tdcmeehan what's your take on this? |
@pettyjamesm Since @tdcmeehan seems busy, let's choose the least intrusive way by just adding description to these properties for now. We can always change their names in the future. What do you think? |
@yingsu00 sure, I'll revert the property name changes but leave in the updated config description annotations and documentation changes (with the property name references changed to the originals, of course). Shall I go ahead and squash down the PR commits into the two commits now, or after you have a chance to review the final documentation commit separately? |
9148911
to
79107ac
Compare
@pettyjamesm the updated description looks good to me. Now let's squash the last 4 commits into the first 5, or if you prefer 2 commits that's also fine. Thank you! |
This change adds the notion of a SplitWeight to the presto-spi, a concept which allows connectors to indicate the amount of work associated with a given split, relative to some "standard" split. Connectors can choose to assign that concept of a weight based on whatever heuristics might be appropriate within the context of that connector, such as size in bytes or some other metric that might be known about the split at the time that it is generated. The calculation need not be fully precise, but care should be taken when implementing weight calculations in connectors to avoid using weights that are very small or very large. On the presto-main side, split weights are used to inform how many splits are allowed to be running on a given node or queued on a given task (still configured by: node-scheduler.max-splits-per-node and node-scheduler.max-pending-splits-per-task respectively). Those values are now interpreted to be relative to one "standard" split weight. When all splits are assigned the standard weight, the scheduler will behave the same way as before. However, when splits are assigned weights that are smaller, the scheduler will allow more of them to be assigned or queued to tasks. In effect, this allows the coordinator to assign enough splits to workers for them to stay busy between batches of split assignments when the amount of time that workers take to complete individual splits is too short. In order to control the maximum number of splits that might be delivered in a single task update request, the existing config parameter node-scheduler.max-unacknowledged-splits-per-task still controls the absolute count of splits that the coordinator will allow to be assigned to a given task that have not yet been acknowledged by the worker as having been received. This can be especially important if splits themselves have a large serialized JSON representation, in which case sending a large number of small splits (by weight) could create huge task update requests to be sent.
Implements split weighting for HiveSplit instances based on their size. This behavior is enabled by default and can be disabled by setting the session property `size_based_split_weights_enabled=false` or the hive configuration property `hive.size-based-split-weights-enabled=false`. When enabled, splits are assigned their weight based on their size in bytes relative to `hive.max-split-size`. Splits that are 1/2 of the max split size will be weighted as 1/2 of the standard split weight. Splits that are larger in size than the hive target split size (eg: unsplittable files) are still assigned the standard split weight to avoid scheduler interactions that might be harmful when extremely large weights are calculated. This is a conservative decision designed to prevent existing workloads from regressing, but might be work revisiting in the future. Finally, when size based split weights are enabled, no split will be assigned a weight smaller than the value set by the `minimum_assigned_split_weight` session property or the `hive.minimum-assigned-split-weight` configuration property which defaults to 0.05 (proportional to 1.0, the weight of a standard split). This provides a mechanism to control how aggressively the scheduler will respond to the presence of small files. With the default configuration, files smaller than 5% of the target split size will still be assigned 5% of the standard split weight, allowing a maximum of 20x more splits to be running or queued.
79107ac
to
cccfe44
Compare
@yingsu00 changes have been squashed down into two commits, one for the changes in the engine core and another for the connector implementation in |
@pettyjamesm Looks good, except I found that NodeSchedulerConfig.java was not updated. Will you please update the ConfigDescription for all 3 mentioned properties? |
Are you referring to The third (existing and unchanged configuration) property It was a pre-existing change that I made in advance of this PR to add weighted scheduling because it laid some of the necessary groundwork in terms of the refactoring required- but the intention was specifically to control the kind of behavior that can now occur with weighted scheduling potentially assigning many more splits to nodes. In any case, it has an existing |
@yingsu00 @tdcmeehan any objections to calling this ready to merge (and merging)? |
Confirmed with @yingsu00 via slack that she's in favor of proceeding to merge this, merging now. |
Docs are pending but basically this change:
SplitWeight
field toConnectorSplit
which allows connectors to indicate “this split is smaller than normal by a factor of X” (for hive, the current only implementation, this is based on “size in bytes”, ie: small files)NodeScheduler
and related classes to assign splits workers based on their weight instead of just the split count aloneThe effect of the above means that when splits are sized appropriately, no behavior changes- but when splits are small (ie: when the hive connector is processing small files) the worker split queues are allowed to be deeper to compensate which significantly improves performance.
Description of Changes
Changes to
presto-spi
andpresto-main
ConnectorSplits now carry a
SplitWeight
, which by default returns a "standard" split weight but which can be overridden by connector implementations to influence split scheduling behaviors.All NodeScheduler split assignment decisions are now based on node total and task queued split weight totals instead of split counts, except for "task unacknowledged split counts" (a pre-existing behavior controlled by
NodeSchedulerConfig(node-scheduler.max-unacknowledged-splits-per-task)
). That configuration is now much more significant since it can be used to control how large individual task update requests sent from the coordinator to workers can get when a large number of splits with small weights are scheduled.Changes to
presto-hive
A version of split weighting for the hive connector is included, which is enabled by default but can be disabled by
setting the Hive session property
size_based_split_weights_enabled=false
or the hive configuration propertyhive.size-based-split-weights-enabled=false
. When disabled, all splits are assigned the standard weight.When enabled, splits are assigned their weight based on their size in bytes relative to
hive.max-split-size
. Splits that are 1/2 of the max split size will be weighted as 1/2 of the standard split weight. In this implementation, no split will beassigned a weight smaller than the value set by the
minimum_assigned_split_weight
hive session property or thehive.minimum-assigned-split-weight
configuration property (default: 5). This provides a mechanism to control how aggressively the scheduler will respond to the presence of small files. With the current standard splitweight of 100, this means that split queues will at most be scheduled 20x deeper when all splits are smaller than 1/20th of the max split size.
Currently, splits that are greater than the
hive.max-split-size
value (eg: unsplittable files) are also assigned the standard split weight, such that any given assigned weight will always fall between the minimum assigned and standard weight. This is an implementation choice for the Hive connector, but not a strict requirement on the behavior that connectors might choose to implement in the future.Benchmarks
TPCH scale factor 10GB suite datasets were generated in both Parquet and JSON:
TPCH suite execution time geomean measurements, collected on a cluster of r5.8xlarge instances, with 5 worker nodes (and one coordinator):
Baseline
Improved
Note, that before weighted scheduling, both parquet and JSON small files performed about the same because were bottlenecked on split scheduling throughput and latency. With weighted scheduling enabled, the bottleneck becomes worker I/O and decoding throughput so parquet and JSON perform differently as a result.